#include "thread_pool.h"
#include "log.h"

#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
 
/* 工作者线程函数, 从任务链表中取出任务并执行 */
static void* thread_routine(void *arg)
{
    tpool_t *tpool = (tpool_t *)arg;
    tpool_work_t *work;
    void * thd_ctx = NULL;
    
    INFO("thread_routine startup success");
    while(1) {
        /* 如果线程池没有被销毁且没有任务要执行，则等待 */
        pthread_mutex_lock(&tpool->queue_lock);
        while(!tpool->queue_head && !tpool->shutdown) {
            pthread_cond_wait(&tpool->queue_cond_can_get, &tpool->queue_lock);
        }
        if (tpool->shutdown) {
            pthread_mutex_unlock(&tpool->queue_lock);
            if(thd_ctx){
                tpool->thd_ctx_fini_func(tpool->context,thd_ctx);
            }
            pthread_exit(NULL);
        }
        if(!thd_ctx){
            tpool->thd_ctx_init_func(tpool->context,&thd_ctx);
        }
        INFO("get task");
        work = tpool->queue_head;
        tpool->queue_head = tpool->queue_head->next;
        pthread_mutex_unlock(&tpool->queue_lock);
 
        work->routine(tpool->context,&thd_ctx,work->arg);
        free(work);
    }
    
    if(thd_ctx){
        tpool->thd_ctx_fini_func(tpool->context,thd_ctx);
    }
    return NULL;   
}
 
/*
 * 创建线程池 
 */
tpool_t * tpool_create(int max_thr_num, void * context, 
    void*(*thd_ctx_init_func)(void *,void**), void*(*thd_ctx_fini_func)(void *,void*))
{
    int i;
    tpool_t *tpool = NULL;
    
    tpool = (tpool_t *)calloc(1, sizeof(tpool_t));
    if (!tpool) {
        ERROR("tpool calloc failed");
        return NULL;
    }
    
    /* 初始化 */
    tpool->max_thr_num = max_thr_num;
    tpool->shutdown = 0;
    tpool->queue_head = NULL;
    tpool->context = context;
    tpool->thd_ctx_init_func = thd_ctx_init_func;
    tpool->thd_ctx_fini_func = thd_ctx_fini_func;
    if (pthread_mutex_init(&tpool->queue_lock, NULL) !=0) {
        ERROR("pthread_mutex_init failed, errno:%d, error:%s", errno, strerror(errno) );
        free(tpool);
        return NULL;
    }
    if (pthread_cond_init(&tpool->queue_cond_can_get, NULL) !=0 ) {
        ERROR("pthread_cond_init failed, errno:%d, error:%s", errno, strerror(errno) );
        free(tpool);
        return NULL;
    }
    if (pthread_cond_init(&tpool->queue_cond_can_put, NULL) !=0 ) {
        ERROR("pthread_cond_init failed, errno:%d, error:%s", errno, strerror(errno) );
        free(tpool);
        return NULL;
    }
    
    /* 创建工作者线程 */
    tpool->thr_id = (pthread_t *)calloc(max_thr_num, sizeof(pthread_t));
    if (!tpool->thr_id) {
        ERROR("pthread_t calloc failed");
        free(tpool);
        return NULL;
    }
    
    pthread_attr_t attr;
	pthread_attr_init (&attr);
	pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);  //avoid memory leak
    for (i = 0; i < max_thr_num; ++i) {
        if (pthread_create(&tpool->thr_id[i], &attr, thread_routine, (void *)tpool) != 0){
            ERROR("pthread_create failed, errno:%d, error:%s", errno, strerror(errno));
            free(tpool->thr_id);
            free(tpool);
            return NULL;
        }
    }
    
    return tpool;
}


/* 销毁线程池 */
void tpool_destroy(tpool_t * tpool)
{
    int i;
    tpool_work_t *member;
 
    if (tpool->shutdown) {
        return;
    }
    tpool->shutdown = 1;
 
    /* 通知所有正在等待的线程 */
    pthread_mutex_lock(&tpool->queue_lock);
    pthread_cond_broadcast(&tpool->queue_cond_can_get);
    pthread_mutex_unlock(&tpool->queue_lock);
    for (i = 0; i < tpool->max_thr_num; ++i) {
        pthread_join(tpool->thr_id[i], NULL);
    }
    free(tpool->thr_id);
 
    while(tpool->queue_head) {
        member = tpool->queue_head;
        tpool->queue_head = tpool->queue_head->next;
        free(member);
    }
    
    pthread_mutex_destroy(&tpool->queue_lock);    
    pthread_cond_destroy(&tpool->queue_cond_can_get);
    pthread_cond_destroy(&tpool->queue_cond_can_put);
    
    free(tpool);    
}


/* 向线程池添加任务 */
int tpool_add_work(tpool_t * tpool, void*(*routine)(void*,void **,void*), void *arg)
{
    tpool_work_t *work, *member;
    
    if (!routine){
        ERROR("Invalid argument");
        return -1;
    }
    
    work = (tpool_work_t *)malloc(sizeof(tpool_work_t));
    if (!work) {
        ERROR("tpool_work_t malloc failed");
        return -1;
    }
    work->routine = routine;
    work->arg = arg;
    work->next = NULL;
 
    pthread_mutex_lock(&tpool->queue_lock);
    member = tpool->queue_head;
    if (!member) {
        tpool->queue_head = work;
    } else {
        while(member->next) {
            member = member->next;
        }
        member->next = work;
    }
    /* 通知工作者线程，有新任务添加 */
    pthread_cond_broadcast(&tpool->queue_cond_can_get);
    pthread_mutex_unlock(&tpool->queue_lock);
    INFO("put task");
    
    return 0;    
}
